package com.tpvlog.im.gateway.tcp.dispatcher;

import com.tpvlog.im.common.Constants;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 分发系统实例管理器
 */
public class DispatcherInstanceManager {
    /**
     * 分发系统实例地址列表
     */
    private static List<DispatcherInstanceAddress> dispatcherInstanceAddresses = new ArrayList<DispatcherInstanceAddress>();

    static {
        // TODO: 这里正常应该从iplist服务或注册中心获取
        DispatcherInstanceAddress address = new DispatcherInstanceAddress("localhost", "127.0.0.1", 8090);
        dispatcherInstanceAddresses.add(address);
    }

    /**
     * 分发系统实例
     */
    private Map<String, DispatcherInstance> dispatcherInstances = new ConcurrentHashMap<String, DispatcherInstance>();

    private DispatcherInstanceManager() {
    }

    public static DispatcherInstanceManager getInstance() {
        return DispatcherInstanceManagerHolder.INSTANCE;
    }

    /**
     * 初始化，与分发系统建立长连接
     */
    public void init() {
        // 主动跟一批分发系统建立长连接
        for (DispatcherInstanceAddress dispatcherInstanceAddress : dispatcherInstanceAddresses) {
            try {
                connectDispatcherInstance(dispatcherInstanceAddress);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 随机选择一个分发系统实例
     *
     * @return
     */
    public DispatcherInstance chooseDispatcherInstance() {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        random.setSeed(System.currentTimeMillis());
        int index = random.nextInt(dispatcherInstances.size());
        DispatcherInstance[] list = dispatcherInstances.values().toArray(new DispatcherInstance[0]);
        return list[index];
    }

    public void removeDispatcherInstance(String dispatcherChannelId) {
        dispatcherInstances.remove(dispatcherChannelId);
    }

    private void connectDispatcherInstance(DispatcherInstanceAddress dispatcherInstanceAddress) throws InterruptedException {
        final EventLoopGroup threadGroup = new NioEventLoopGroup();
        Bootstrap client = new Bootstrap();

        client.group(threadGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ByteBuf delimiter = Unpooled.copiedBuffer(Constants.DELIMITER);
                        socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(4096, delimiter));
                        socketChannel.pipeline().addLast(new DispatcherInstanceHandler());
                    }
                });

        ChannelFuture channelFuture = client.connect(dispatcherInstanceAddress.getIp(), dispatcherInstanceAddress.getPort());
        channelFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                // 与Dispatcher建立连接成功，缓存Dispatcher实例
                if (channelFuture.isSuccess()) {
                    SocketChannel channel = (SocketChannel) channelFuture.channel();
                    DispatcherInstance dispatcherInstance = new DispatcherInstance(channel);

                    String dispatcherChannelId = channel.remoteAddress().getHostName() + ":" + channel.remoteAddress().getPort();
                    dispatcherInstances.put(dispatcherChannelId, dispatcherInstance);
                    System.out.println("已经跟IM-DISPATCHER系统建立连接，分发系统地址为：" + channelFuture.channel().remoteAddress());
                } else {
                    channelFuture.channel().close();
                    threadGroup.shutdownGracefully();
                }
            }
        });
        channelFuture.sync();
    }


    private static class DispatcherInstanceManagerHolder {
        private static final DispatcherInstanceManager INSTANCE = new DispatcherInstanceManager();
    }

}
